热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

竞选|和会_Flink1.15源码解析启动JobManagerWebMonitorEndpoint启动

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。



文章目录


  • 一、前言
  • 二、WebMonitorEndpoint 构建
    • 2.1、restEndpointFactory 的初始化
    • 2.2、createRestEndpoint 创建 WebMonitorEndpoint

  • 三、WebMonitorEndpoint 启动
    • 3.1、Router
    • 3.2、注册了一堆Handler
    • 3.3、Netty启动的相关操作
      • 3.3.1、 ChannelInitializer 初始化
      • 3.3.2、NioEventLoopGroup 初始化
      • 3.3.3、绑定 rest endpoint
      • 3.3.4、restAddress 启动成功
      • 3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

    • 3.4、钩子来启动子类特定的服务。
      • 3.4.1、 节点选举
        • 3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象
        • 3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象



  • 四、总结
  • 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)


一、前言

从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

// 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
// 初始化 MetricFetcher, 默认刷新间隔是10s
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建 三大组件之 WebMonitorEndpoint
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
// 启动 三大组件之 WebMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();

本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动


二、WebMonitorEndpoint 构建

WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同

接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化


2.1、restEndpointFactory 的初始化

1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory

@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());

2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory

public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory)
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(
SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);

restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE


2.2、createRestEndpoint 创建 WebMonitorEndpoint

RestEndpointFactory 创建 DispatcherRestEndpoint

/** &#64;link RestEndpointFactory which creates a &#64;link DispatcherRestEndpoint. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway>
INSTANCE;
&#64;Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler)
throws Exception
final RestHandlerConfiguration restHandlerConfiguration &#61;
RestHandlerConfiguration.fromConfiguration(configuration);
// 创建 DispatcherRestEndpoint
return new DispatcherRestEndpoint(
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
resourceManagerGatewayRetriever,
transientBlobService,
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);


创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint

/** REST endpoint for the &#64;link Dispatcher component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway>
//......


三、WebMonitorEndpoint 启动

实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start


3.1、Router

// 1、首先创建Router&#xff0c;来解析Client的请求并寻找对应的Handler
final Router router &#61; new Router();

3.2、注册了一堆Handler

// 2、 注册了一堆Handler
// 2.1、初始化 handlers
final CompletableFuture<String> restAddressFuture &#61; new CompletableFuture<>();
handlers &#61; initializeHandlers(restAddressFuture);
// 2.2、将这些Handler进行排序&#xff0c;这里的排序是为了确认URL和Handler一对一的关系
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/

Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
// 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
checkAllEndpointsAndHandlersAreUnique(handlers);
// 2.4、 注册 handlers
handlers.forEach(handler -> registerHandler(router, handler, log));

3.3、Netty启动的相关操作


3.3.1、 ChannelInitializer 初始化

// 3.1、 ChannelInitializer 初始化
ChannelInitializer<SocketChannel> initializer &#61;
new ChannelInitializer<SocketChannel>()
&#64;Override
protected void initChannel(SocketChannel ch) throws ConfigurationException
RouterHandler handler &#61; new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled())
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories)
Optional<ChannelHandler> channelHandler &#61;
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent())
ch.pipeline().addLast(channelHandler.get());


ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));

;

3.3.2、NioEventLoopGroup 初始化

NioEventLoopGroup bossGroup &#61;
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup &#61;
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap &#61; new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childHandler(initializer);

3.3.3、绑定 rest endpoint

// 3.3、 Binding rest endpoint
// 3.3.1、获取可用端口范围
Iterator<Integer> portsIterator;
try
portsIterator &#61; NetUtils.getPortRangeFromString(restBindPortRange);
catch (IllegalConfigurationException e)
throw e;
catch (Exception e)
throw new IllegalArgumentException(
"Invalid port range definition: " &#43; restBindPortRange);

// 3.3.2、处理端口冲突 将逐一尝试端口是否可用
int chosenPort &#61; 0;
while (portsIterator.hasNext())
try
chosenPort &#61; portsIterator.next();
final ChannelFuture channel;
// 绑定address,port 获取 channel
if (restBindAddress &#61;&#61; null)
channel &#61; bootstrap.bind(chosenPort);
else
channel &#61; bootstrap.bind(restBindAddress, chosenPort);

serverChannel &#61; channel.syncUninterruptibly().channel();
break;
catch (final Exception e)
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException))
throw e;



if (serverChannel &#61;&#61; null)
throw new BindException(
"Could not start rest endpoint on any port in port range "
&#43; restBindPortRange);

log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress &#61; (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress())
advertisedAddress &#61; this.restAddress;
else
advertisedAddress &#61; bindAddress.getAddress().getHostAddress();

port &#61; bindAddress.getPort();
log.info("Rest endpoint listening at :", advertisedAddress, port);
restBaseUrl &#61; new URL(determineProtocol(), advertisedAddress, port, "").toString();

3.3.4、restAddress 启动成功

restAddressFuture.complete(restBaseUrl);

3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

state &#61; State.RUNNING;

3.4、钩子来启动子类特定的服务。


/**
* Hook to start sub class specific services.
*
* &#64;throws Exception if an error occurred
*/

protected abstract void startInternal() throws Exception;

我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal

&#64;Override
public void startInternal() throws Exception
// 1、 节点选举
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI)
log.info("Web frontend listening at .", getRestBaseUrl());



3.4.1、 节点选举

HighAvailabilityServices 初始化, 根据 high-availability 的类型创建不同的 HighAvailabilityServices

leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的。

highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),

3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象

&#64;Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
synchronized (lock)
checkNotShutdown();
return new StandaloneLeaderElectionService();


节点选举&#xff0c; 直接将 contender 设置为领导者&#xff0c; 此处的 contender 就是 WebMonitorEndpoint

&#64;Override
public void start(LeaderContender newContender) throws Exception
if (contender !&#61; null)
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");

contender &#61; Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);


3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象

org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService

&#64;Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
// 由子类实现 创建 选举leader服务
return createLeaderElectionService(getLeaderPathForRestServer());

子类实现

//org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
&#64;Override
protected LeaderElectionService createLeaderElectionService(String leaderPath)
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);

// 创建 DefaultLeaderElectionService
// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path)
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));

DefaultLeaderElectionService 启动节点选举&#xff0c; 此处传入的 contender 就是 WebMonitorEndpoint

Flink的选举使用的是Curator框架&#xff0c;节点的选举针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver&#xff0c;在完成选举之后&#xff0c;会回调两个方法&#xff0c;如果选举成功会回调isLeader方法&#xff0c;如果竞选失败则回调notLeader方法。

// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start
&#64;Override
public final void start(LeaderContender contender) throws Exception
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender &#61;&#61; null, "Contender was already set.");
synchronized (lock)
running &#61; true;
/*
在WebMonitorEndpoint中调用时&#xff0c;此contender为DispatcherRestEndPoint
在ResourceManager中调用时,contender为ResourceManager
在DispatcherRunner中调用时,contender为DispatcherRunner
*/

leaderContender &#61; contender;

// 针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver
leaderElectionDriver &#61;
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);


Flink的选举使用的是Curator框架&#xff0c;节点的选举针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver&#xff0c;在完成选举之后&#xff0c;会回调两个方法&#xff0c;如果选举成功会回调isLeader方法&#xff0c;如果竞选失败则回调notLeader方法。

ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver&#xff0c; LeaderElectionDriver负责执行领导选举和存储
领导信息。

// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
&#64;Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // DefaultLeaderElectionService对象
FatalErrorHandler fatalErrorHandler, // new LeaderElectionFatalErrorHandler()
String leaderContenderDescription)
throws Exception
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);


public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
checkNotNull(path);
this.client &#61; checkNotNull(client);
this.connectionInformationPath &#61; ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler &#61; checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler &#61; checkNotNull(fatalErrorHandler);
this.leaderContenderDescription &#61; checkNotNull(leaderContenderDescription);
leaderLatchPath &#61; ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch &#61; new LeaderLatch(client, leaderLatchPath);
this.cache &#61;
ZooKeeperUtils.createTreeCache(
client

推荐阅读
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 这篇文章 | 夕阳下的防火墙命令全解 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 深入解析 Golang 中 Context 的功能与应用
    本文详细探讨了 Golang 中 Context 的核心功能及其应用场景,通过深入解析其工作机制,帮助读者更好地理解和运用这一重要特性,对于提升代码质量和项目开发效率具有重要的参考价值。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • Spring Boot 实战(一):基础的CRUD操作详解
    在《Spring Boot 实战(一)》中,详细介绍了基础的CRUD操作,涵盖创建、读取、更新和删除等核心功能,适合初学者快速掌握Spring Boot框架的应用开发技巧。 ... [详细]
  • 本文作为“实现简易版Spring系列”的第五篇,继前文深入探讨了Spring框架的核心技术之一——控制反转(IoC)之后,将重点转向另一个关键技术——面向切面编程(AOP)。对于使用Spring框架进行开发的开发者来说,AOP是一个不可或缺的概念。了解AOP的背景及其基本原理,对于掌握这一技术至关重要。本文将通过具体示例,详细解析AOP的实现机制,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • 利用 JavaScript 实现定时任务的高效执行方法(代码可直接复用) ... [详细]
  • MySQL 5.7 学习指南:SQLyog 中的主键、列属性和数据类型
    本文介绍了 MySQL 5.7 中主键(Primary Key)和自增(Auto-Increment)的概念,以及如何在 SQLyog 中设置这些属性。同时,还探讨了数据类型的分类和选择,以及列属性的设置方法。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 字节码开发笔记:深入解析与应用技巧 ... [详细]
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有